{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Online Feature Store\n",
    "\n",
    "This notebook includes examples of how to interact with the **Online** Feature Store in Hopsworks. The online feature store stores a subset of the feature data for real-time queries, suited for serving client-facing models. \n",
    "\n",
    "The online feature store contrasts to the **offline** feature store. The offline feature store contains historical data. The offline feature data is stored in Hive, a storage engine suited for large scale batch processing of data (such as *training* a machine learning model). On the other hand, the online feature store uses MySQL-Cluster database as the backend, a storage engine suited for smaller datasets that need to be queried in real-time."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>26</td><td>application_1569258228210_0030</td><td>spark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8088/proxy/application_1569258228210_0030/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8042/node/containerlogs/container_e01_1569258228210_0030_01_000001/demo_featurestore_admin000__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n",
      "import io.hops.util.Hops\n",
      "import scala.collection.JavaConversions._\n",
      "import collection.JavaConverters._\n",
      "import org.apache.spark.sql.Row\n",
      "import java.sql.Date\n",
      "import java.sql.Timestamp\n",
      "import spark.implicits._\n",
      "import org.apache.spark.sql.types._\n"
     ]
    }
   ],
   "source": [
    "import io.hops.util.Hops\n",
    "import scala.collection.JavaConversions._\n",
    "import collection.JavaConverters._\n",
    "import org.apache.spark.sql.Row\n",
    "import java.sql.Date\n",
    "import java.sql.Timestamp\n",
    "import spark.implicits._\n",
    "import org.apache.spark.sql.types._"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Get JDBC Connection to the Online Feature Store\n",
    "\n",
    "If your project's feature store has the online feature store enabled, there will be a storage connector for each user to access the online feature store. The storage connector can be accessed using the utility method `getOnlineFeaturestoreConnector()` in the Scala SDK. The storage connector includes information about the JDBC connection, the password, port, host, and username etc."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "res1: io.hops.util.featurestore.dtos.storageconnector.FeaturestoreJdbcConnectorDTO = FeaturestoreJdbcConnectorDTO{connectionString='jdbc:mysql://10.0.2.15:3306/demo_featurestore_admin000', arguments='password=YTnQHFxNHwMlEpZboyJCgpZFSqyyKgQHXnUHJzSrVNhOslGRqKifTmzvRnhudipF,user=demo_featurestore_admin000_meb1'}\n"
     ]
    }
   ],
   "source": [
    "Hops.getOnlineFeaturestoreConnector.read"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a Feature Group with Online Feature Serving Enabled.\n",
    "\n",
    "When a feature group has online feature serving enabled, it means that its data will be stored in both Hive (for historical queries) and MySQL Cluster (for online queries). To enable online feature serving of a feature group simply set the flag `online=True` when creating a feature group, as illustrated below."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create Sample Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "sampleData: Seq[org.apache.spark.sql.Row] = List([1,2019-03-02,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2019-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia])\n",
      "schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))\n",
      "sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n"
     ]
    }
   ],
   "source": [
    "val sampleData = Seq(\n",
    "    Row(1, Date.valueOf(\"2019-02-30\"), 0.4151f, \"Sweden\"),\n",
    "    Row(2, Date.valueOf(\"2019-05-01\"), 1.2151f, \"Ireland\"),\n",
    "    Row(3, Date.valueOf(\"2019-08-06\"), 0.2151f, \"Belgium\"),\n",
    "    Row(4, Date.valueOf(\"2019-08-06\"), 0.8151f, \"Russia\")\n",
    ")\n",
    "val schema = \n",
    " scala.collection.immutable.List(\n",
    "  StructField(\"id\", IntegerType, true),\n",
    "  StructField(\"date\", DateType, true),\n",
    "  StructField(\"value\", FloatType, true),\n",
    "  StructField(\"country\", StringType, true) \n",
    ")\n",
    "val sampleDf = spark.createDataFrame(\n",
    "  spark.sparkContext.parallelize(sampleData),\n",
    "  StructType(schema)\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  1|2019-03-02|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "|  3|2019-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sampleDf.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Save the Sample Data as a Feature group with online feature serving enabled\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "(Hops.createFeaturegroup(\"test_online_fg_scala_sdk\")\n",
    "     .setDataframe(sampleDf)\n",
    "     .setOnline(true)\n",
    "     .setPrimaryKey(List(\"id\"))\n",
    "     .write())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When creating a feature group, the spark dataframe is used to infer the data-schema and the feature types. The data schema is then used to create a Hive table (for offline data) and a MySQL table (for online data). If you want to have more control over the feature types for the MySQL table (e.g length of a varchar column) you can pass in the types in the optional argument `onlineTypes`, which takes a map of the form `feature_name --> feature_type`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "onlineTypes: scala.collection.immutable.Map[String,String] = Map(value -> DECIMAL)\n",
      "sampleDf2: org.apache.spark.sql.DataFrame = [id: int, date_test: date ... 2 more fields]\n"
     ]
    }
   ],
   "source": [
    "val onlineTypes = Map[String, String](\n",
    "    \"value\" -> \"DECIMAL\"\n",
    ")\n",
    "val sampleDf2 = sampleDf.withColumnRenamed(\n",
    "    \"value\", \"value_test\").withColumnRenamed(\n",
    "    \"country\", \"country_test\").withColumnRenamed(\n",
    "    \"date\", \"date_test\")\n",
    "(Hops.createFeaturegroup(\"test_online_fg_scala_sdk_types\")\n",
    "     .setDataframe(sampleDf2)\n",
    "     .setOnline(true)\n",
    "     .setPrimaryKey(List(\"id\"))\n",
    "     .setOnlineTypes(onlineTypes)\n",
    "     .write())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Read Features from Online Feature Store\n",
    "\n",
    "The same methods for reading the offline feature store can be used to read from the online feature store by setting the argument `online=True`. **However, NOTE**: as the online feature store is supposed to be used for feature serving, it should be queried with primary-key lookups for getting the best performance. In fact, it is highly discouraged to use the online feature serving for doing full-table-scans. If you find yourself frequently needing to use `get_featuregroup(online=True)` to get the entire feature group (full-table scan), you are probably better of using the offline feature store. The online feature store is intended for quick primary key lookups, not data analysis.\n",
    "\n",
    "To make the migration from the regular offline-featurestore API to the online-featurestore simple, for each example of reading from the online featurestore below, there is an accompanying example of reading from the offline feature store."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Free-Text SQL Query for the Online Feature Store\n",
    "\n",
    "Featuregroups are stored as tables with the naming `featuregroupname_version` as Hive tables for offline features, and MySQL tables for online features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]\n"
     ]
    }
   ],
   "source": [
    "//primary key lookup in MySQL\n",
    "val df = (Hops.queryFeaturestore(\"SELECT country FROM test_online_fg_scala_sdk_1 WHERE id=1\")\n",
    "     .setOnline(true).read)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+\n",
      "|country|\n",
      "+-------+\n",
      "| Sweden|\n",
      "+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Free-Text SQL Query for the Offline Feature Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]\n"
     ]
    }
   ],
   "source": [
    "//primary key lookup in MySQL\n",
    "val df = (Hops.queryFeaturestore(\"SELECT country FROM test_online_fg_scala_sdk_1 WHERE id=1\")\n",
    "     .setOnline(false).read)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+\n",
      "|country|\n",
      "+-------+\n",
      "| Sweden|\n",
      "+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Online Version of Feature Group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, date: date ... 2 more fields]\n"
     ]
    }
   ],
   "source": [
    "val df = Hops.getFeaturegroup(\"test_online_fg_scala_sdk\").setOnline(true).read"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  1|2019-03-02|0.4151| Sweden|\n",
      "|  3|2019-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Offline Version of Feature Group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, date: date ... 2 more fields]\n"
     ]
    }
   ],
   "source": [
    "val df = Hops.getFeaturegroup(\"test_online_fg_scala_sdk\").setOnline(false).read"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  1|2019-03-02|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "|  3|2019-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Online Version of individual Feature"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]\n"
     ]
    }
   ],
   "source": [
    "val df = Hops.getFeature(\"country\").setOnline(true).read"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+\n",
      "|country|\n",
      "+-------+\n",
      "| Sweden|\n",
      "|Belgium|\n",
      "| Russia|\n",
      "|Ireland|\n",
      "+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Offline Version of individual Feature"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]\n"
     ]
    }
   ],
   "source": [
    "val df = Hops.getFeature(\"country\").setOnline(false).read"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+\n",
      "|country|\n",
      "+-------+\n",
      "| Sweden|\n",
      "|Ireland|\n",
      "|Belgium|\n",
      "| Russia|\n",
      "+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Online Version of a list of individual Features\n",
    "\n",
    "The featues can potentially span multiple feature groups, as long as all feature groups have online serving enabled, the feature store query planner will join the features on the fly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "features: List[String] = List(country, date)\n",
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, date: date]\n"
     ]
    }
   ],
   "source": [
    "val features = List(\"country\", \"date\")\n",
    "val df = Hops.getFeatures(features).setOnline(true).read"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------+\n",
      "|country|      date|\n",
      "+-------+----------+\n",
      "| Sweden|2019-03-02|\n",
      "|Belgium|2019-08-06|\n",
      "| Russia|2019-08-06|\n",
      "|Ireland|2019-05-01|\n",
      "+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read Offline Version of a list of individual Features"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "features: List[String] = List(country, date)\n",
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, date: date]\n"
     ]
    }
   ],
   "source": [
    "val features = List(\"country\", \"date\")\n",
    "val df = Hops.getFeatures(features).setOnline(false).read"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------+\n",
      "|country|      date|\n",
      "+-------+----------+\n",
      "| Sweden|2019-03-02|\n",
      "|Ireland|2019-05-01|\n",
      "|Belgium|2019-08-06|\n",
      "| Russia|2019-08-06|\n",
      "+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### List all feature groups that have online feature serving enabled"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "res1: java.util.List[String] = [test_online_fg_scala_sdk_types_1, online_featuregroup_test_types_1, test_online_fg_scala_sdk_1]\n"
     ]
    }
   ],
   "source": [
    "Hops.getFeaturegroups.setOnline(true).read"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### List all features that have online feature serving enabled"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "res2: java.util.List[String] = [country_test, date_test, id, value_test, id, val_1_type_test, val_2_type_test, country, date, id, value]\n"
     ]
    }
   ],
   "source": [
    "Hops.getFeaturesList.setOnline(true).read"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Enable Online Feature Serving for a Feature Group that is Offline-Only\n",
    "\n",
    "By default when a feature group is created with `create_featuregroup()`, the feature group will not have online serving enabled, all data will be stored in the offline feature group (Hive). To create a feature group with online serving, pass the flag `online=True` to `create_featuregroup()` (an example is provided in the beginning of this notebook).\n",
    "\n",
    "If you want to enable online feature serving for a feature group dynamically, after the feature group have been created, you can use the API call `enable_featuregroup_online` (this will create a MySQL table in the backend). Conversely, if you want to disable online feature serving, use the API call `disable_featuregroup_online` (this will drop the MySQL table in the backend)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create Feature Group without Online Feature Serving Enabled"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "sampleData: Seq[org.apache.spark.sql.Row] = List([1,0.4151,0.915], [2,1.2151,0.151], [3,0.2151,0.7511], [4,0.8151,0.12541])\n",
      "schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(test_col_3,FloatType,true), StructField(test_col_4,FloatType,true))\n",
      "sampleDf: org.apache.spark.sql.DataFrame = [id: int, test_col_3: float ... 1 more field]\n"
     ]
    }
   ],
   "source": [
    "val sampleData = Seq(\n",
    "    Row(1, 0.4151f, 0.915f),\n",
    "    Row(2, 1.2151f, 0.151f),\n",
    "    Row(3, 0.2151f, 0.7511f),\n",
    "    Row(4, 0.8151f, 0.12541f)\n",
    ")\n",
    "val schema = \n",
    " scala.collection.immutable.List(\n",
    "  StructField(\"id\", IntegerType, true),\n",
    "  StructField(\"test_col_3\", FloatType, true),\n",
    "  StructField(\"test_col_4\", FloatType, true)\n",
    ")\n",
    "val sampleDf = spark.createDataFrame(\n",
    "  spark.sparkContext.parallelize(sampleData),\n",
    "  StructType(schema)\n",
    ")\n",
    "\n",
    "(Hops.createFeaturegroup(\"enable_online_features_test_scala_sdk\")\n",
    "                         .setDataframe(sampleDf)\n",
    "                         .setPrimaryKey(List(\"id\"))\n",
    "                         .write())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Enable Online Feature Serving for Offline-Only feature group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "Hops.enableFeaturegroupOnline(\"enable_online_features_test_scala_sdk\").write"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### DIsable Online Feature Serving for a feature group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "Hops.disableFeaturegroupOnline(\"enable_online_features_test_scala_sdk\").write"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Insert into Offline/Online Feature Groups\n",
    "\n",
    "When inserting data into a feature group you can control whether the data should be written only to the offline feature group, only to the online feature group, or to both, using the parameters `online=True` and `offline=True`:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Generate Sample Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "sampleData: Seq[org.apache.spark.sql.Row] = List([5,2015-03-02,2.001,Iran], [6,2016-05-01,3.2171,Canada])\n",
      "schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))\n",
      "sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n"
     ]
    }
   ],
   "source": [
    "val sampleData = Seq(\n",
    "    Row(5, Date.valueOf(\"2015-02-30\"), 2.001f, \"Iran\"),\n",
    "    Row(6, Date.valueOf(\"2016-05-01\"), 3.2171f, \"Canada\"))\n",
    "val schema = \n",
    " scala.collection.immutable.List(\n",
    "  StructField(\"id\", IntegerType, true),\n",
    "  StructField(\"date\", DateType, true),\n",
    "  StructField(\"value\", FloatType, true),\n",
    "  StructField(\"country\", StringType, true) \n",
    ")\n",
    "val sampleDf = spark.createDataFrame(\n",
    "  spark.sparkContext.parallelize(sampleData),\n",
    "  StructType(schema)\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Insert into Online Feature Group Only"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "(Hops.insertIntoFeaturegroup(\"test_online_fg_scala_sdk\")\n",
    "                         .setDataframe(sampleDf)\n",
    "                         .setMode(\"append\")\n",
    "                         .setOnline(true)\n",
    "                         .setOffline(false)\n",
    "                         .write())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Insert into Offline Feature Group Only"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "(Hops.insertIntoFeaturegroup(\"test_online_fg_scala_sdk\")\n",
    "                         .setDataframe(sampleDf)\n",
    "                         .setMode(\"append\")\n",
    "                         .setOnline(false)\n",
    "                         .setOffline(true)\n",
    "                         .write())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Insert into Online and Offline Feature Group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "(Hops.insertIntoFeaturegroup(\"test_online_fg_scala_sdk\")\n",
    "                         .setDataframe(sampleDf)\n",
    "                         .setMode(\"overwrite\")\n",
    "                         .setOnline(true)\n",
    "                         .setOffline(true)\n",
    "                         .write())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Spark",
   "language": "",
   "name": "sparkkernel"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}